-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mateusz #1
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that is not your latest changes
I am missing the Latecny Loggers, and Q8 and Q9
However, I pushed my version update as they my help you to fix your code
src/main/java/Q5_NOTPattern.java
Outdated
DataStream<String> result = patternStream.flatSelect(new UDFs.GetResultTuple()); | ||
|
||
result//.print(); | ||
.writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no Latency Logger
src/main/java/Q5_NOTPattern.java
Outdated
@@ -0,0 +1,104 @@ | |||
package Q_SubmissionSigmodVLDB; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no package required
src/main/java/Q5_NOTQuery.java
Outdated
@@ -0,0 +1,139 @@ | |||
package Q_SubmissionSigmodVLDB; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm
src/main/java/Q5_NOTQuery.java
Outdated
result //.print(); | ||
.writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no Logger
src/main/java/Q5_NOTQuery.java
Outdated
.window(SlidingEventTimeWindows.of(Time.minutes(windowSize), Time.minutes(1))) | ||
.apply(new FlatJoinFunction<Tuple3<KeyedDataPointGeneral, Long, Integer>, Tuple2<KeyedDataPointGeneral, Integer>, Tuple2<KeyedDataPointGeneral, KeyedDataPointGeneral>>() { | ||
@Override | ||
public void join(Tuple3<KeyedDataPointGeneral, Long, Integer> d1, Tuple2<KeyedDataPointGeneral, Integer> d2, Collector<Tuple2<KeyedDataPointGeneral, KeyedDataPointGeneral>> collector) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no HashSet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I checked Q9 and Q8 - for Q9 I took over your solution and made some small changes as I need the classes over night to run on the cluster
I left some comments on the other classes to check
most important if you ever want to merge, you need a git ignore file for our target dir
target/classes/Q1_SEQPattern$1.class
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove target from your pull request
|
||
DataStream<Tuple2<KeyedDataPointGeneral, KeyedDataPointGeneral>> result = velStream.keyBy(new UDFs.getArtificalKey()) | ||
.intervalJoin(quaStream.keyBy(new UDFs.getArtificalKey())) | ||
.between(Time.seconds(1), Time.seconds((windowSize * 60) - 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found a better way to do it:
.between(Time.minutes(0), Time.minutes(windowSize))
.lowerBoundExclusive()
.upperBoundExclusive()
So I changed that for all IVJ
|
||
DataStream<Tuple2<KeyedDataPointGeneral, Integer>> stream = input | ||
.assignTimestampsAndWatermarks(new UDFs.ExtractTimestamp(60000)) | ||
.map(new UDFs.MapKey()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove that data has a actual key we can use
.intervalJoin(quaStream.keyBy(new UDFs.getArtificalKey())) | ||
.between(Time.seconds(1), Time.seconds((windowsize * 60) - 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change key assignment
.where(KeyedDataPointGeneral::getKey)
.equalTo(KeyedDataPointGeneral::getKey)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.between(Time.minutes(0), Time.minutes(windowSize))
.lowerBoundExclusive()
.upperBoundExclusive()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, I am a bit confused with the PR right now, can we move now to the master branch as the main branch is now under submission and I do not want to merge anything which is not in the paper to it.
@@ -0,0 +1,92 @@ | |||
import org.apache.flink.api.common.JobExecutionResult; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q9 is now a duplicate, I added all Q9 but I renamed the classes
@@ -0,0 +1,113 @@ | |||
import org.apache.flink.api.common.JobExecutionResult; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q6 is also already there but has other names and I took your Q8 classes, so in essence all should be there now and we can close this PR
public Var8Event(){ | ||
super(); | ||
this.type = "var8"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we can do a bit smarter by adding the String for type to the constructor of the class, then we just need 1 class to create what we want. I particular, I think we can skip all of them and just add the type to the source function of the type
Things that need to be checked of which I'm unsure: